Conversation
There was a problem hiding this comment.
Pull request overview
This PR tunes DocLoader’s throughput controls by increasing per-batch work sizes, raising Couchbase SDK connection concurrency via pooled clients, and adjusting the per-second throttling behavior.
Changes:
- Reworked
batchSizecomputation to scale more aggressively (and capped it in one constructor). - Switched Loader execution to use an
SDKClientPooland increased pool sizes (including SIFTLoader). - Increased Reactor
flatMapconcurrency in bulk KV operations and adjusted per-second throttling sleep logic.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| src/main/java/utils/docgen/WorkLoadSettings.java | Adjusts batch sizing logic to increase throughput (with inconsistent caps across constructors). |
| src/main/java/SIFTLoader.java | Increases Couchbase SDK client pool size from 2 to 32 for SIFT runs. |
| src/main/java/Loader.java | Migrates from a single SDKClient to SDKClientPool and reduces worker startup delay. |
| src/main/java/couchbase/sdk/SDKClientPool.java | Adds logging around client pool creation. |
| src/main/java/couchbase/sdk/SDKClient.java | Moves to per-client ClusterEnvironment and increases KV connections per client. |
| src/main/java/couchbase/sdk/DocOps.java | Adds explicit Reactor concurrency limits and switches GET decoding to JsonObject. |
| src/main/java/couchbase/loadgen/WorkLoadGenerate.java | Tweaks rate-limit sleep behavior to be less strict and allow bursts. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+75
to
+78
| // Improved batch size calculation with reasonable upper bound | ||
| // Previous batchSize = Math.max(1000, (ops/workers) * 4) could be 125,000+ with high ops | ||
| // New formula caps at 10,000 to prevent memory issues while maintaining throughput | ||
| this.batchSize = Math.min(10000, Math.max(1000, (this.ops/this.workers) * 4)); |
Comment on lines
+294
to
+297
| // Create SDK client pool with 5 client instances for connection isolation | ||
| // 5 pools × 200 connections per client = 1,000 total KV connections | ||
| // Each pool handles 6-7 workers (32 workers / 5 pools) | ||
| SDKClientPool clientPool = new SDKClientPool(); |
Comment on lines
+324
to
+331
| WorkLoadGenerate wlg = new WorkLoadGenerate(th_name, dg, clientPool, esClient, cmd.getOptionValue("durability", "NONE"), | ||
| Integer.parseInt(cmd.getOptionValue("maxTTL", "0")), | ||
| cmd.getOptionValue("maxTTLUnit", "seconds"), _trackFailures, | ||
| Integer.parseInt(cmd.getOptionValue("retry", "0")), null)); | ||
| TimeUnit.MILLISECONDS.sleep(500); | ||
| Integer.parseInt(cmd.getOptionValue("retry", "0")), null); | ||
| wlg.set_collection_for_load( | ||
| cmd.getOptionValue("bucket"), | ||
| cmd.getOptionValue("scope", "_default"), | ||
| cmd.getOptionValue("collection", "_default")); |
Comment on lines
340
to
+342
| tm.getAllTaskResult(); | ||
| tm.shutdown(); | ||
| client.disconnectCluster(); | ||
| client.shutdownEnv(); | ||
| clientPool.shutdown(); |
Comment on lines
+71
to
+83
| .ioConfig(IoConfig.enableDnsSrv(true)) | ||
| .ioConfig(IoConfig.numKvConnections(200)) // 200 per client instance | ||
| .ioConfig(IoConfig.configPollInterval(Duration.ofSeconds(10))) | ||
| .build(); | ||
| cluster_options = ClusterOptions.clusterOptions(master.rest_username, master.rest_password).environment(this.environment); | ||
| } else { | ||
| this.environment = ClusterEnvironment.builder() | ||
| .timeoutConfig(TimeoutConfig.builder().kvTimeout(Duration.ofSeconds(10))) | ||
| .ioConfig(IoConfig.enableDnsSrv(true)) | ||
| .ioConfig(IoConfig.numKvConnections(200)) // 200 per client instance | ||
| .ioConfig(IoConfig.configPollInterval(Duration.ofSeconds(10))) | ||
| .build(); | ||
| cluster_options = ClusterOptions.clusterOptions(master.rest_username, master.rest_password).environment(this.environment); |
Comment on lines
224
to
228
| String cb = cmd.getOptionValue(skipCB.getOpt(), "false"); | ||
| if (!Boolean.parseBoolean(cb)) { | ||
| try { | ||
| clientPool.create_clients(cmd.getOptionValue("bucket"), master, 2); | ||
| clientPool.create_clients(cmd.getOptionValue("bucket"), master, 32); | ||
| } catch (Exception e) { |
Comment on lines
+41
to
52
| // Now 4000 since each worker has its own connection pool | ||
| int concurrency = Math.min(documents.size(), 4000); | ||
| return Flux.fromIterable(documents) | ||
| .flatMap(documentToInsert -> { | ||
| String k = documentToInsert.getT1(); | ||
| Object v = documentToInsert.getT2(); | ||
|
|
||
| return reactiveCollection.insert(k, v, insertOptions) | ||
| .then(Mono.<Result>empty()) | ||
| .onErrorResume(error -> Mono.just(new Result(k, v, error, false))); | ||
| }) | ||
| }, concurrency) | ||
| .collectList() |
Comment on lines
+88
to
+93
| Object content = null; | ||
| try { | ||
| content = result.contentAs(JsonObject.class); | ||
| } catch (Exception e) { | ||
| throw new RuntimeException(e); | ||
| } |
|
|
||
| public List<ConcurrentHashMap<String, Object>> bulkReplace(Collection collection, List<Tuple2<String, Object>> documents, | ||
| ReplaceOptions replaceOptions) { | ||
| final ReactiveCollection reactiveCollection = collection.reactive(); |
Comment on lines
157
to
161
| public List<ConcurrentHashMap<String, Object>> bulkTouch(Collection collection, List<String> keys, final int exp, | ||
| TouchOptions touchOptions, Duration exp_duration) { | ||
| final ReactiveCollection reactiveCollection = collection.reactive(); | ||
| int concurrency = Math.min(keys.size(), 2000); | ||
| List<ConcurrentHashMap<String, Object>> returnValue = Flux.fromIterable(keys) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.